package org.example.api.window;

import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import org.example.api.bean.SensorReading;

/**
 * @author huangqihan
 * @date 2021/2/22
 */
public class EventTimeWindowTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 并行度变化时，watermark 会有所影响，watermark 最好设置在 source 附近
        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(100);

        String host = "localhost";
        int port = 7777;
        DataStreamSource<String> inputStream = env.socketTextStream(host, port);

        SingleOutputStreamOperator<SensorReading> mapStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        }).assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                    @Override
                    public long extractTimestamp(SensorReading ele) {
                        // 数据时间戳单位是秒
                        return ele.getTimestamp() * 1000L;
                    }
                });

        OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
        };

        // 基于事件时间的开窗聚合，统计15s内温度最小值
        // 窗口起始点确定 s = t - ( t - o +ws) % ws , s = start time, t = timestamp, o = offset, ws = window size
        SingleOutputStreamOperator<SensorReading> minByStream = mapStream.keyBy("id")
                .timeWindow(Time.seconds(15))
                .allowedLateness(Time.minutes(1))
                .sideOutputLateData(outputTag)
                .minBy("temperature");
        minByStream.print("minTemp");
        minByStream.getSideOutput(outputTag).print("late");



        env.execute();
    }
}
